package com.wdl.datarest.implementation;

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.InterruptedException;
import java.net.DatagramPacket;
import java.net.InetAddress;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import com.wdl.datarest.data.Device;
import com.wdl.datarest.data.Alarm;
import com.wdl.datarest.data.Person;
import com.wdl.datarest.data.DeviceRepository;
import com.wdl.datarest.data.AlarmRepository;
import com.wdl.datarest.data.AlarmType;
import com.wdl.datarest.data.PersonRepository;
import com.wdl.datarest.prometheus.HealthData;
import com.wdl.datarest.data.SleepMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Thread worker for processing device data and save in DeviceData or Alarm table.
 */
@Service
@Scope("prototype")
public class DevWorkerThread implements Runnable{
    private static Logger log = LoggerFactory.getLogger("DevWorkerThread");

    /* 
     * At most 100000 devices. In total 10 threads.
     * One thread takes care of 10000 devices. 1data/dev/5s. 
     * So around 2000data/s. Thread handles 1data/10ms.
     */ 
    private static final int DEV_DATA_QUEUE_SIZE = 2000;

    private volatile boolean stopped;
    private Thread thread;
    private int threadId;

    private final BlockingQueue<DatagramPacket> queue = new ArrayBlockingQueue<DatagramPacket>(DEV_DATA_QUEUE_SIZE);

    @Autowired
    private DeviceRepository deviceRepo;

    @Autowired
    private AlarmRepository alarmRepo;

    @Autowired
    private PersonRepository personRepo;

    @Autowired
    private DevCnfgCache devCnfgCache;

    @Autowired
    private HealthData healthData;

    public void setThreadId(int threadId){
        this.threadId = threadId;
    }

    public int getThreadId(){
        return this.threadId;
    }

    public boolean isQueueFull() {
        if (queue.size() >= DEV_DATA_QUEUE_SIZE){
            return true;
        }
        return false;
    }

    public void start() {
        log.info("DevWorkerThread-" + this.threadId + " start()");
        stopped = false;
        thread = new Thread(this, "DevWorkerThread-" + this.threadId);
        thread.start();
    }

    public void stop() {
        log.info("DevDataThead-" + threadId + " stop()");
        stopped = true;
        thread.interrupt();
    }

    public boolean add(DatagramPacket devData) throws UnsupportedEncodingException {
        // queue.put is blocking, so use queue.offer instead
        boolean insertionOK = queue.offer(devData);

        if (!insertionOK) {
            log.error("DevWorkerThread-" + this.threadId + " add(), insertion failure, The queue size is now " + queue.size());
            return insertionOK;
        }

        if (log.isDebugEnabled()) {
            log.debug("DevWorkerThread-" + this.threadId + " add(),  insertion successfully. The queue size is now " + queue.size());
        }

        return insertionOK;
    }

    public void run() {
        while (true) {
            // This call will block until there is at least one event
            try {
                DatagramPacket devDataItem = queue.take();
                if (devDataItem != null){
                    if (log.isDebugEnabled()) {
                        log.debug("The queue size is now " + queue.size() + ". Data just taken: ");
                        DevMonTask.printPacket(devDataItem.getData());
                    }
                    processDevData(devDataItem);
                } else {
                    log.warn("devDataItem is null. The queue size is now " + queue.size());
                }
            } catch (InterruptedException ex) {
                log.error("InterruptedException Caught, processDevData() is not called!", ex);
                if (stopped){ 
                    return;
                }
                continue;
            } catch (Exception e){
                log.info("Exception Caught, processDevData() is not called!", e);
                if(stopped){
                    return;
                }
                continue;
            }
        }
    }

    private void processDevData(DatagramPacket packet) {
        byte[] bytePacket = new byte[20];
        byte[] byteDevId = new byte[10];
        String strDevId = null;

        bytePacket = packet.getData();
        if (isValidPacket(bytePacket) == false) {
            return;
        }
        
        // Dev ID  (10 bytes)
        for (int i = 0; i < 10;i++) {
            byteDevId[i] = (byte) bytePacket[4+i];
        }

        if (bytePacket[1] == (byte)0xDB) {           
            byte[] byteResponseSU = new byte[512+9];
            int numFrame = DevMonTask.getNumFrame(byteDevId[1]);
            int intSize = DevMonTask.getImageSize(byteDevId[1]);
            
            for (int i=0; i<numFrame; i++) {    
                int j=0;
                // DB1 Header: always 0xE5
                byteResponseSU[0] = (byte) 0xE5;
                // DB2 Type: 0xF1(config data), 0xF2(reserved), 0xF3(ACK), 0xF4(image info), 0xF5(image data)
                byteResponseSU[1] = (byte)0xF5;
                // DB3 Frame number
                byteResponseSU[2] = (byte)((i + 1)&0xFF);
                // DB4 Data length and data
                if(i<numFrame-1) {
                    byteResponseSU[3] = 2;
                    byteResponseSU[4] = 0;
                    for (j=0; j<512; j++) {
                        byteResponseSU[j+5] = DevMonTask.getImageData(byteDevId[1])[i*512+j];
                    }
                } else {
                    byteResponseSU[3] = (byte)((intSize%512)/256);
                    byteResponseSU[4] = (byte)((intSize%512)%256);
                    for (j=0; j<(intSize%512); j++) {
                        byteResponseSU[j+5] = DevMonTask.getImageData(byteDevId[1])[i*512+j];
                    }
                }

                // DB5 CRC
                if(i<numFrame-1) {
                    byte[] byteCRCMsg = new byte[516];
                    for (int k= 0; k < byteCRCMsg.length; k++){
                        byteCRCMsg[k] = byteResponseSU[k+1];
                    }
                    short shortCRC = DevMonTask.getCrc16(byteCRCMsg, byteCRCMsg.length);
                    byte[] byteCRC = DevMonTask.short2bytes(shortCRC);
                    byteResponseSU[j+6] = byteCRC[1];
                    byteResponseSU[j+7] = byteCRC[0];    
                } else {
                    byte[] byteCRCMsg = new byte[intSize%512+4];
                    for (int k= 0; k < byteCRCMsg.length; k++){
                        byteCRCMsg[k] = byteResponseSU[k+1];
                    }
                    short shortCRC = DevMonTask.getCrc16(byteCRCMsg, byteCRCMsg.length);
                    byte[] byteCRC = DevMonTask.short2bytes(shortCRC);
                    byteResponseSU[j+6] = byteCRC[1];
                    byteResponseSU[j+7] = byteCRC[0];    
                }
    
                // DB6 Tail
                byteResponseSU[j+8] = (byte)0xE6;
                
                InetAddress address = packet.getAddress();
                int port = packet.getPort();
                DatagramPacket responsePacket = new DatagramPacket(byteResponseSU, j+9, address, port);
    
                try{
                    DevMonTask.serverSocket.send(responsePacket);
                    Thread.sleep(200);
                } catch (IOException ioe){
                    log.error("DevWorkerThread: Failed to send response to device.", ioe);
                    return;
                } catch (Exception e){
                    log.error("DevWorkerThread: Failed to send response to device.", e);
                    return;
                }
            }
            return;
        }

        try {
            strDevId = new String(byteDevId, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            log.error("Failed to get device ID", e);
        }

        // Find device in cache. If found it means this device assigned to a person.
        CacheData cacheData = devCnfgCache.getCnfgDataFromCacheByDeviceIdString(strDevId);
        if (cacheData == null) {
            log.warn("Device " + strDevId + " is not in cache, data is being ignored.");
            return;
        }

        Device dev = deviceRepo.findByIndex(cacheData.getDeviceIndex());
        if (dev != null) {
            long currentTime = new Date().getTime();
            dev.setUpdateTime(currentTime);
            deviceRepo.save(dev);
            cacheData.setDeviceUpdateTime(currentTime);
            devCnfgCache.updateCacheByDeviceIdString(strDevId, cacheData);
        }

        long longPersonId = cacheData.getPersonId();
        Person person = personRepo.findByIndex(longPersonId);
        if (person == null) {
            log.error("Failed to find personId " + longPersonId + " from DB. Ignore request.");
            return;
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Found person " + longPersonId + " from DB.");
            }
        }

        if (person.isDeviceAlarmFlag() == true) {
            person.setDeviceAlarmFlag(false);
            person.setHideDeviceAlarm(false);
            acknowledgeAlarm(person, AlarmType.DEVICE);
        }

        if (bytePacket[1] == (byte)0xAA) {
            if (log.isDebugEnabled()) {
                log.debug("Data type received is 0xAA for heartbeat/breatherate/sleepmode.");
            }

            ///////////////////////////////////
            // A1. process hearbeat and breathe rate data
            ///////////////////////////////////
            healthData.processHeartBreatheData(cacheData, bytePacket[14]&0xFF, bytePacket[15]&0xFF);

            ///////////////////////////////////
            // A2. process sleep mode
            ///////////////////////////////////
            SleepMode sleepMode = SleepMode.HEAVY;
            if (bytePacket[16] == (byte)0xB1) {
                sleepMode = SleepMode.HEAVY;
            } else if (bytePacket[16] == (byte)0xB2) {
                sleepMode = SleepMode.LIGHT;
            } else if (bytePacket[16] == (byte)0xB3) {
                sleepMode = SleepMode.AWAKE;
            } else {
                log.warn("Unknown sleep mode, not supported: " + String.format("%X", bytePacket[16]));
            }

            healthData.processSleepData(cacheData, sleepMode);

            ///////////////////////////////////
            // A3. process heartbeat alarm
            ///////////////////////////////////
            if (bytePacket[17] == (byte)0xA1) {
                // heartbeat abnormal, skip following alarms if an alarm already fired!
                if (!person.isHeartAlarmFlag()) {
                    Alarm hbAlarm = new Alarm();
                    hbAlarm.setRaisedTime(new Date().getTime());
                    hbAlarm.setPersonId(longPersonId);
                    hbAlarm.setDeviceId(strDevId);
                    hbAlarm.setInstitutionId(person.getInstitution() == null ? 1: person.getInstitution().getId());
                    hbAlarm.setPersonName(person.getPersonName());
                    hbAlarm.setType(com.wdl.datarest.data.AlarmType.HEART);
                    hbAlarm.setValue(bytePacket[14]);
                    Alarm savedAlarm = alarmRepo.save(hbAlarm);
                    person.setHeartAlarmFlag(true);
                    person.setHideHeartAlarm(false);
                    person.setHeartAlarmId(savedAlarm.getId());
                }
            } else if (bytePacket[17] == (byte)0xA0) {
                if (person.isHeartAlarmFlag()) {
                    person.setHeartAlarmFlag(false);
                    person.setHideHeartAlarm(false);
                    acknowledgeAlarm(person, AlarmType.HEART);
                }
            } else {
                log.warn("Unknown HeartBeat alarm value, not supported: " + String.format("%X", bytePacket[17]));
            }

            ///////////////////////////////////
            // A4. process breathe rate alarm
            ///////////////////////////////////
            if (bytePacket[18] == (byte)0xA2) {
                // breathe abnormal, skip following alarms if an alarm already fired!
                if (!person.isBreatheAlarmFlag()) {
                    Alarm breatheAlarm = new Alarm();
                    breatheAlarm.setRaisedTime(new Date().getTime());
                    breatheAlarm.setPersonId(longPersonId);
                    breatheAlarm.setDeviceId(strDevId);
                    breatheAlarm.setInstitutionId(person.getInstitution() == null ? 1 : person.getInstitution().getId());
                    breatheAlarm.setPersonName(person.getPersonName());
                    breatheAlarm.setType(com.wdl.datarest.data.AlarmType.BREATHE);
                    breatheAlarm.setValue(bytePacket[15]);
                    Alarm savedAlarm = alarmRepo.save(breatheAlarm);
                    person.setBreatheAlarmFlag(true);
                    person.setHideBreatheAlarm(false);
                    person.setBreatheAlarmId(savedAlarm.getId());
                }
            } else if (bytePacket[18] == (byte)0xA0) {
                if (person.isBreatheAlarmFlag()) {
                    person.setBreatheAlarmFlag(false);
                    person.setHideBreatheAlarm(false);
                    acknowledgeAlarm(person, AlarmType.BREATHE);
                }
            } else {
                log.warn("Unknown breathe rate alarm value, not supported: " + String.format("%X", bytePacket[18]));
            }
        } else if (bytePacket[1] == (byte)0xBB) {
            if (log.isDebugEnabled()) {
                log.debug("Data type received is 0xBB for bed alarm, body movement, and wet alarm.");
            }

            ///////////////////////////////////
            // B1. process up, edge, away alarm
            ///////////////////////////////////
            Alarm bedAlarm = new Alarm();
            boolean saveNewAlarm = false;
            boolean needAckAlarm = false;
            bedAlarm.setRaisedTime(new Date().getTime());
            bedAlarm.setPersonId(longPersonId);
            bedAlarm.setDeviceId(strDevId);
            bedAlarm.setValue(bytePacket[14]);
            bedAlarm.setInstitutionId(person.getInstitution() == null ? 1 : person.getInstitution().getId());
            bedAlarm.setPersonName(person.getPersonName());

            if(bytePacket[14] == (byte)0xC1) {
                if (log.isDebugEnabled()) {
                    log.debug("Sit up alarm");
                }

                if (person.isUpAlarmFlag() && person.getUpAlarmId() > 0) {
                    // Already up alarm and not manually acked, do nothing
                    person.setHideUpAlarm(false);
                } else {
                    if(person.isSideAlarmFlag() || person.isAwayAlarmFlag()) {
                        needAckAlarm = true;
                    }
                    bedAlarm.setType(AlarmType.UP);
                    person.setUpAlarmFlag(true);
                    person.setHideUpAlarm(false);
                    person.setSideAlarmFlag(false);
                    person.setAwayAlarmFlag(false);
                    saveNewAlarm = true;
                }
            } else if (bytePacket[14] == (byte)0xC2) {
                if (log.isDebugEnabled()) {
                    log.debug("Bed edge alarm");
                }

                if (person.isSideAlarmFlag() && person.getSideAlarmId() > 0) {
                    // Already side alarm and not manually acked, do nothing
                    person.setHideSideAlarm(false);
                } else {
                    if(person.isUpAlarmFlag() || person.isAwayAlarmFlag()) {
                        needAckAlarm = true;
                    }
                    bedAlarm.setType(AlarmType.SIDE);
                    person.setSideAlarmFlag(true);
                    person.setHideSideAlarm(false);
                    person.setUpAlarmFlag(false);
                    person.setAwayAlarmFlag(false);
                    saveNewAlarm = true;
                }
            } else if (bytePacket[14] == (byte)0xC3) {
                if (log.isDebugEnabled()) {
                    log.debug("Away alarm");
                }

                if (person.isAwayAlarmFlag() && person.getAwayAlarmId() > 0) {
                    // Already away alarm and not manually acked, do nothing
                    person.setHideAwayAlarm(false);
                } else {
                    if(person.isUpAlarmFlag() || person.isSideAlarmFlag()) {
                        needAckAlarm = true;
                    }
                    bedAlarm.setType(AlarmType.AWAY);
                    person.setHideAwayAlarm(false);
                    person.setAwayAlarmFlag(true);
                    person.setUpAlarmFlag(false);
                    person.setSideAlarmFlag(false);
                    saveNewAlarm = true;
                }
            } else if (bytePacket[14] == (byte)0xC4) {
                if (log.isDebugEnabled()) {
                    log.debug("Lie in bed");
                }

                if (person.isUpAlarmFlag() || person.isSideAlarmFlag() || person.isAwayAlarmFlag()) {
                    needAckAlarm = true;
                    person.setUpAlarmFlag(false);
                    person.setSideAlarmFlag(false);
                    person.setAwayAlarmFlag(false);
                }
            } else if (bytePacket[14] == (byte)0xC7) {
                if (log.isDebugEnabled()) {
                    log.debug("ring alarm");
                }

                if (person.isRingAlarmFlag() && person.getRingAlarmId() > 0) {
                    // Already ring, nothing to do
                    person.setHideRingAlarm(false);
                } else { 
                    bedAlarm.setType(AlarmType.RING);
                    person.setRingAlarmFlag(true);
                    saveNewAlarm = true;
                }
            } else if (bytePacket[14] == (byte)0xC0) {
                if (log.isDebugEnabled()) {
                    log.debug("Alarm is cancelled manually");
                }

                if (person.isLeftAlarmFlag()) {
                    person.setLeftAlarmFlag(false);
                    person.setHideLeftAlarm(false);
                    acknowledgeAlarm(person, AlarmType.LEFT);
                }

                if (person.isRightAlarmFlag()) {
                    person.setRightAlarmFlag(false);
                    person.setHideRightAlarm(false);
                    acknowledgeAlarm(person, AlarmType.RIGHT);
                }

                if (person.isHeartAlarmFlag()) {
                    person.setHeartAlarmFlag(false);
                    person.setHideHeartAlarm(false);
                    acknowledgeAlarm(person, AlarmType.HEART);
                }

                if (person.isBreatheAlarmFlag()) {
                    person.setBreatheAlarmFlag(false);
                    person.setHideBreatheAlarm(false);
                    acknowledgeAlarm(person, AlarmType.BREATHE);
                }

                if (person.isRingAlarmFlag()) {
                    person.setRingAlarmFlag(false);
                    person.setHideRingAlarm(false);
                    acknowledgeAlarm(person, AlarmType.RING);
                }

                if (person.isTurnOverReminderAlarmFlag()) {
                    person.setTurnOverReminderAlarmFlag(false);
                    acknowledgeAlarm(person, AlarmType.TURN_OVER);
                }

                if (person.isUpAlarmFlag() || person.isSideAlarmFlag() || person.isAwayAlarmFlag()) {
                    needAckAlarm = true;
                }

                person.setUpAlarmFlag(false);
                person.setSideAlarmFlag(false);
                person.setAwayAlarmFlag(false);
                person.setRingAlarmFlag(false);
                person.setHideUpAlarm(false);
                person.setHideSideAlarm(false);
                person.setHideAwayAlarm(false);
                person.setHideRingAlarm(false);
            } else {
                log.warn("This bed alarm value is not supported yet: " + bytePacket[14]);
            }

            if (needAckAlarm == true) {
                // Ack existing alarm firstly to avoid this new alarm is acked.
                acknowledgeAlarm(person, AlarmType.UP);
                acknowledgeAlarm(person, AlarmType.SIDE);
                acknowledgeAlarm(person, AlarmType.AWAY);
            } 

            if (saveNewAlarm == true) {
                // Allow alarm to be displayed in GUI
                person.setHideUpAlarm(false);
                person.setHideSideAlarm(false);
                person.setHideAwayAlarm(false);
                person.setHideRingAlarm(false);
                // Save new alarm in DB
                Alarm savedAlarm = alarmRepo.save(bedAlarm);
                switch (bedAlarm.getType()) {
                case UP:
                    person.setUpAlarmId(savedAlarm.getId());
                    break;
                case SIDE:
                    person.setSideAlarmId(savedAlarm.getId());
                    break;
                case AWAY:
                    person.setAwayAlarmId(savedAlarm.getId());
                    break;
                case RING:
                    person.setRingAlarmId(savedAlarm.getId());
                    break;
                default:
                    break;
                }
            }

            ///////////////////////////////////
            // B2. process body movement
            ///////////////////////////////////
            long currentTime = new Date().getTime();
            if (bytePacket[15] == (byte)0xC5) {
                // save body movement times
                if (log.isDebugEnabled()) {
                    log.debug("Save move counter and generate move alarm accordingly.");
                }

                // increase body move count in cache
                cacheData.increaseMoveTotal();

                // Check below to see if a new alarm need to be fired
                if (!person.isMoveAlarmFlag()) {
                    long ackTime = 0;
                    ackTime = person.getMoveAlarmAckTime();
                    if (log.isDebugEnabled()) {
                        log.debug("Move alarm ackTime=" + ackTime);
                    }

                    long timePassed = currentTime - ackTime;
                    timePassed = (timePassed > 60 * 60 * 1000) ? 60 * 60 * 1000 : timePassed;
                    short bodyMoveDelta = cacheData.getBodyMoveDelta(timePassed);

                    // Raise body movement times over threshold alarm
                    if (log.isDebugEnabled()) {
                        log.debug("Called getBodyMoveDelta(): currentTime=" + currentTime + ", interval=" + timePassed + ", bodyMoveDelta=" + bodyMoveDelta);
                    }

                    if (bodyMoveDelta >= cacheData.getBodyMoveTH()) {
                        Alarm alarmMove = new Alarm();
                        alarmMove.setRaisedTime(currentTime);
                        alarmMove.setPersonId(longPersonId);
                        alarmMove.setDeviceId(strDevId);
                        alarmMove.setValue(bytePacket[15]);
                        alarmMove.setInstitutionId(person.getInstitution() == null ? 1 : person.getInstitution().getId());
                        alarmMove.setPersonName(person.getPersonName());
                        alarmMove.setType(AlarmType.MOVE);
                        alarmMove.setValue(bodyMoveDelta);
                        Alarm savedAlarm = alarmRepo.save(alarmMove);
                        person.setHideMoveAlarm(false);
                        person.setMoveAlarmFlag(true);
                        person.setMoveAlarmId(savedAlarm.getId());
                    }
                }
            } else if (bytePacket[15] == (byte)0x30) {
                // nothing to do
            } else {
                log.warn("This movement value is not supported yet: " + bytePacket[15]);
            }

            ///////////////////////////////////
            // B3. process wet alarm
            ///////////////////////////////////
            if (bytePacket[16] == (byte)0xC6) {
                if (log.isDebugEnabled()) {
                    log.debug("Save wet counter and wet alarm.");
                }

                if (!person.isWetAlarmFlag() || person.getWetAlarmId() == 0) {
                    if (log.isDebugEnabled()) {
                        log.info("Save wet alarm.");
                    }

                    Alarm wetAlarm = new Alarm();
                    wetAlarm.setRaisedTime(new Date().getTime());
                    wetAlarm.setPersonId(longPersonId);
                    wetAlarm.setDeviceId(strDevId);
                    wetAlarm.setValue(bytePacket[16]);
                    wetAlarm.setInstitutionId(person.getInstitution() == null ? 1 : person.getInstitution().getId());
                    wetAlarm.setPersonName(person.getPersonName());
                    wetAlarm.setType(com.wdl.datarest.data.AlarmType.WET);
                    Alarm savedAlarm = alarmRepo.save(wetAlarm);
                    person.setHideWetAlarm(false);
                    person.setWetAlarmFlag(true);
                    person.setWetAlarmId(savedAlarm.getId());
                }
            } else if (bytePacket[16] == (byte)0x30) {
                if (person.isWetAlarmFlag()) {
                    person.setWetAlarmFlag(false);
                    person.setHideWetAlarm(false);
                    acknowledgeAlarm(person, AlarmType.WET);
                }
            } else {
                log.info("This wet value is not supported yet: " + bytePacket[16]);
            }

            ///////////////////////////////////
            // B4. process turn over alarm
            ///////////////////////////////////
            Alarm turnOverAlarm = new Alarm();
            turnOverAlarm.setRaisedTime(new Date().getTime());
            turnOverAlarm.setPersonId(longPersonId);
            turnOverAlarm.setDeviceId(strDevId);
            turnOverAlarm.setValue(bytePacket[16]);
            turnOverAlarm.setInstitutionId(person.getInstitution() == null ? 1 : person.getInstitution().getId());
            turnOverAlarm.setPersonName(person.getPersonName());

            if (bytePacket[17] == (byte)0xC8) {
                if (log.isDebugEnabled()) {
                    log.debug("Save left turn over alarm.");
                }
                turnOverAlarm.setType(AlarmType.LEFT);
                Alarm savedAlarm = alarmRepo.save(turnOverAlarm);
                person.setHideLeftAlarm(false);
                person.setLeftAlarmFlag(true);
                person.setLeftAlarmId(savedAlarm.getId());
                LogUtils.logAlarm(turnOverAlarm);

                if (person.isRightAlarmFlag()) {
                    person.setRightAlarmFlag(false);
                    person.setHideRightAlarm(false);
                    acknowledgeAlarm(person, AlarmType.RIGHT);
                }
            } else if (bytePacket[17] == (byte)0xC9) {
                if (log.isDebugEnabled()) {
                    log.debug("Save right turn over alarm.");
                }
                turnOverAlarm.setType(AlarmType.RIGHT);
                Alarm savedAlarm = alarmRepo.save(turnOverAlarm);
                person.setHideRightAlarm(false);
                person.setRightAlarmFlag(true);
                person.setRightAlarmId(savedAlarm.getId());
                LogUtils.logAlarm(turnOverAlarm);

                if (person.isLeftAlarmFlag()) {
                    person.setLeftAlarmFlag(false);
                    person.setHideLeftAlarm(false);
                    acknowledgeAlarm(person, AlarmType.LEFT);
                }
            } else if (bytePacket[17] == (byte)0x30) {
                // nothing to do
            } else {
                log.info("This turn over value is not supported yet: " + bytePacket[17]);
            }
        } else if (bytePacket[1] == (byte)0xCC) {
            if (log.isDebugEnabled()) {
                log.info("Data type received is 0xCC device handshake.");
            }
        } else {
            log.warn("Data type received is not supported yet: " + String.format("%02X ", bytePacket[1]));
        }

        ///////////////////////////////////
        // update person table
        ///////////////////////////////////
        personRepo.save(person);
    }

    private boolean isValidPacket(byte[] bytePacket){
        if (bytePacket[0] != (byte)0xE2) {
            log.warn("Received frame header is unknown, being ignored: " + bytePacket[0]);
            return false;
        }

        if (bytePacket[1] != (byte)0xAA && bytePacket[1] != (byte)0xBB && bytePacket[1] != (byte)0xCC && bytePacket[1] != (byte)0xDB) {
            log.warn("Received frame type is unknown, being ignored: " + bytePacket[1]);
            return false;
        }

        if (bytePacket[1] == (byte)0xAA && bytePacket[3] != (byte)0xF) {
            log.warn("Received frame lenth is incorrect, being ignored: type=0xAA, length=" + String.format("%02X ", bytePacket[3]) + "(should be 0xF)");
            return false;
        }

        if (bytePacket[1] == 0xBB && bytePacket[3] != (byte)0xE) {
            log.warn("Received frame lenth is incorrect, being ignored: type=0xBB, length=" + String.format("%02X ", bytePacket[3]) + "(should be 0xE)");
            return false;
        }

        if (bytePacket[1] == 0xCC && bytePacket[3] != (byte)0xA) {
            log.warn("Received frame lenth is incorrect, being ignored: type=0xCC, " + ", length=" + String.format("%02X ", bytePacket[3]) + "(should be 0xA)");
            return false;
        }

        if (bytePacket[1] == (byte)0xAA && bytePacket[21] != (byte)0xE3) {
            log.warn("Received frame tail is incorrect, being ignored: " + bytePacket[21]);
            return false;
        }

        if (bytePacket[1] == (byte)0xBB && bytePacket[20] != (byte)0xE3) {
            log.warn("Received frame tail is incorrect, being ignored: " + bytePacket[20]);
            return false;
        }

        if (bytePacket[1] == (byte)0xCC && bytePacket[16] != (byte)0xE3) {
            log.warn("Received frame tail is incorrect, being ignored: " + bytePacket[16]);
            return false;
        }

        return true;
    }

    private void acknowledgeAlarm(Person person, AlarmType type) {
        long alarmId = 0;

        switch(type) {
        case HEART:
            alarmId = person.getHeartAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setHeartAlarmId(0);
            }
            break;
        case BREATHE:
            alarmId = person.getBreatheAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setBreatheAlarmId(0);
            }
            break;
        case UP:
            alarmId = person.getUpAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setUpAlarmId(0);
            }
            break;
        case SIDE:
            alarmId = person.getSideAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setSideAlarmId(0);
            }
            break;
        case AWAY:
            alarmId = person.getAwayAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setAwayAlarmId(0);
            }
            break;
        case WET:
            alarmId = person.getWetAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setWetAlarmId(0);
            }
            break;
        case MOVE: 
            alarmId = person.getMoveAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setMoveAlarmId(0);
            }
            break;
        case DEVICE:
            alarmId = person.getDeviceAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setDeviceAlarmId(0);
            }
            break;
        case RING:
            alarmId = person.getRingAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setRingAlarmId(0);
            }
            break;
        case TURN_OVER:
            alarmId = person.getTurnOverReminderAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setTurnOverReminderAlarmId(0);
            }

            // check if need to schedule next reminder
            if (person.getTurnOverReminder() > 0 && person.getCheckoutTime() <= 0 && person.getDevice1() > 0) {
                CacheData cacheData = devCnfgCache.getCnfgDataFromCacheByPersonId(person.getId());
                if (cacheData == null) {
                    log.error("CacheData is null for personName=" + person.getPersonName() + " while acking turn over alarm.");
                } else {
                    log.info("Schedule turn over reminder fo personName=" + person.getPersonName() + ", interval=" + person.getTurnOverReminder() + " minutes.");
                    devCnfgCache.scheduleTurnOverReminder(person.getId(), person.getTurnOverReminder());
                    cacheData.setSentReminder(true);
                }
            }
            break;
        case LEFT:
            alarmId = person.getLeftAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setLeftAlarmId(0);
            }
            break;
        case RIGHT:
            alarmId = person.getRightAlarmId();
            if (alarmId > 0) {
                Alarm alarm = alarmRepo.findByIndex(alarmId);
                if (alarm != null) {
                    alarm.setAckedTime(System.currentTimeMillis());
                    alarmRepo.save(alarm);
                    LogUtils.logAlarm(alarm);
                }
                person.setRightAlarmId(0);
            }
            break;
        default:
            break;
        }
    }

    public int getQueueSize() {
        return queue.size();
    }

    public String getName() {
        return thread.getName();
    }
}